RxJava2.x 线程切换解析

RxJava2.x 线程切换解析

前言

之所以 RxJava 能在安卓开发中如此受欢迎,其中一个重要的原因就是在于 RxJava 切换线程的便利性特别适用于 Android 中主线程不能执行耗时操作的规定,配合 Retrofit 适用能对于网络 IO 操作提供非常大的便利。

这篇文章的目的就是深入解析RxJava 中关于线程切换的原理。在阅读下文前,你可以先阅读《RxJava2.x 订阅流程解析 》一文作为基础。

线程切换流程

RxJava2.x 中线程切换方法和 RxJava1.x 中没有区别,主要通过 Observable.subscribeOn 和 Observable.observeOn 两个方法实现,前者主要是规定事件源代码执行的线程,而后者则是规定下一个操作符中代码执行的线程,我们已一个简单的流程为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
System.out.println("Observer.ObservableEmitter线程: " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
}
})
// 规定了上面 subscribe 方法执行的线程
.subscribeOn(Schedulers.io())
// 规定了下面 Observer 中 onNext 方法执行的线程
.observeOn(Schedulers.computation())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Integer integer) {
System.out.println("Observer.onNext线程: " + Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});

输出结果中,被观察者事件源代码执行在 RxCachedThreadScheduler-1 即 Schedulers.io() 线程中,观察者代码执行在了 RxComputationThreadPool-1 即 Schedulers.computation() 线程中

值得注意的是 subscribeOn 方法只在第一次执行时有效,就是说被观察者只会跑在第一次 subscribeOn 规定的线程上(没有就是当前线程),至于看完后面就能知道,我们先从 observeOn 方法开始

observeOn 流程解析

Observable.observeOn 方法

1
2
3
4
5
6
7
8
9
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

由此可以得知,切换线程的关键就在于 ObservableObserveOn 的 subscribeActual 方法,而且由于 observeOn 是作用于观察者的回调方法,我们可以得知肯定是对 observer 做了某种程度的封装,使其可以切换线程执行回调

ObservableObserveOn.subscribeActual 方法

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 该调度器会使线程执行在原有线程
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 真正能切换线程的调度器在这里
Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

我们可以看到方法内确实对出传入的 observer 进行了封装,那我们可以深入的去查看 ObserveOnObserver 中继承自 Observer 接口的方法,以 onNext 的实现为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void onNext(T t) {
// 放重入判断
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
// 将上游获取的数据缓存到队列中
queue.offer(t);
}
schedule();
}

/**
* 真正的调度方法
*/
void schedule() {
// 原子操作,保证并发安全
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}

切换线程交由 worker 去执行,而 worker 是有调度器scheduler.createWorker 方法创建的,其实只是让 worker 去执行一个异步任务(具体流程会在后面解析 Schedulers 中的默认调度器实现时提及),而 ObserveOnObserver 也实现了 runnable 接口,所以后续逻辑需要到 run 方法中去寻找

ObserveOnObserver.run 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
// 正常流程
drainNormal();
}
}

void drainNormal() {
int missed = 1;
// 上游被观察者回调的对象队列
final SimpleQueue<T> q = queue;
// actual是传入的下游的观察者
final Observer<? super T> a = actual;

for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}

for (;;) {
boolean d = done;
T v;

try {
// 队列中去数据,与线程执行前的存数据对应
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;

if (checkTerminated(d, empty, a)) {
return;
}

if (empty) {
break;
}
// 传递给下游观察者
a.onNext(v);
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

这流程就比较清晰了,在 ObserveOnObserver 封装的观察者中,会以队列的形式存储上游回调来的数据,再以调度器生成的 worker 做线程切换的动作,在线程上执行时从队列中取数据再交由下游的观察者继续执行回调,从而完成流程执行过程中的线程切换

subscribeOn 流程解析

那回过头来看 subscribeOn 方法,该方法的切换时机肯定是在 subscribeActual 方法执行的时候了

Observable.subscribeOn 方法

1
2
3
4
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

同理可以,具体的流程应该在 ObservableSubscribeOn.subscribeActual 方法

ObservableSubscribeOn.subscribeActual 方法

1
2
3
4
5
6
7
8
9
@Override
public void subscribeActual(final Observer<? super T> s) {
// 对线程安全做了包装
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// onSubscribe 方法并没有执行在 subscribeOn 的线程上
s.onSubscribe(parent);
// 这里才是真正切换订阅线程的地方
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

scheduler.scheduleDirect 方法内部其实也是通过 createWorker 去创建一个 worker 通过 schedule 方法执行线程,而线程的实现则是在 SubscribeTask 中的 run 方法上

SubscribeTask.run 方法

1
2
3
4
5
@Override
public void run() {
// SubscribeTask 是 ObservableSubscribeOn 的内部类,所以 source.subscribe(parent) 订阅方法变成了指定线程执行
source.subscribe(parent);
}

因为订阅的调用是在指定线程上执行的,所以被观察者的代码执行也是在该线程上的,订阅流程中的多次切换其实只有最靠近被观察者的 subscribeOn 会影响被观察者,并不是没有发生线程切换,只是又被切到了其他线程,其实在 onSubscribe 方法里是可以观测到线程的切换的

Scheduler 解析

线程的切换都是通过调度器创建的 worker 的 schedule 方法去实现的,而该方法是一个抽象方法,具体的逻辑由各自的子类实现,其实不难发现其中最重要的一个子类就是 NewThreadWorker,而 NewThreadWorker.schedule 最终调用的则是 NewThreadWorker.scheduleActual

NewThreadWorker.scheduleActual 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { 
// 对线程任务进行包装,多数是订阅或者回调线程的切换
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
// 由线程池去执行任务
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}

这个类中的线程池在构造函数中创建

1
2
3
4
public NewThreadWorker(ThreadFactory threadFactory) {
// SchedulerPoolFactory 中有线程池的管理,会定时清理无用的线程池
executor = SchedulerPoolFactory.create(threadFactory);
}

可以看出最终线程还是会交由线程池中执行,而线程池的管理则是 Scheduler 最主要的工作了,我们以常用的 io 调度器为例

io 调度器解析

在 Schedulers.io() 方法中经过多次追踪可以发现其实现为 IoScheduler 类,那么可以通过 createWorker 方法作为切入点

IoScheduler.createWorker 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
public Worker createWorker() {
// pool 指 AtomicReference<CachedWorkerPool>
return new EventLoopWorker(pool.get());
}

static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;

final AtomicBoolean once = new AtomicBoolean();

EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
// 这里从对象池中获取 threadWorker,可以避免频繁创建对象的内存开销
this.threadWorker = pool.get();
}

@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();

// releasing the pool should be the last action
pool.release(threadWorker);
}
}

@Override
public boolean isDisposed() {
return once.get();
}

@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
return EmptyDisposable.INSTANCE;
}
// 真正执行 schedule 方法的是 threadWorker
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}

我们可以看出,创建的 EventLoopWorker 只是一个壳,而真正的 worker 则是从 CachedWorkerPool 中获取的,而 CachedWorkerPool 实现是一套自己的管理 worker 实例的逻辑,是了实现 io 线程即能动态调整大小,又能实现缓存的效果

CachedWorkerPool 详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
static final class CachedWorkerPool implements Runnable {
// 每个 ThreadWorker 执行完毕后的存活时间
private final long keepAliveTime;
// 存放 ThreadWorker 的线程安全的队列,以存活时间大小排序
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
// ThreadWorker 的引用
final CompositeDisposable allWorkers;
// 内部线程池,用来执行 ThreadWorker 的清理工作,注意 CachedWorkerPool 是继承了 Runnable,所以相关代码可以在 run 方法里看到
private final ScheduledExecutorService evictorService;
// 执行清理工作的线程,线程名带 RxCachedWorkerPoolEvictor 前缀
private final Future<?> evictorTask;
// ThreadWorker 线程创建时的工程,一般 io 线程的名字规范是带 RxCachedThreadScheduler 前缀
private final ThreadFactory threadFactory;

CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;

ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
// 这是一个周期运行的线程任务,作用是定时清理缓存队列
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}

@Override
public void run() {
evictExpiredWorkers();
}

/**
* 这里就是获取真正执行 worker 的代码,会从缓存中取,如果没有才会新建
*/
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}

// 如果没有缓存的 worker 新建一个
ThreadWorker w = new ThreadWorker(threadFactory);
// 引用持有,可以随时终端线程执行
allWorkers.add(w);
return w;
}

/**
* 将执行完毕的线程释放,更新存活时间后放入缓存队列中,在 EventLoopWorker 被 dispose 是执行
*/
void release(ThreadWorker threadWorker) {
threadWorker.setExpirationTime(now() + keepAliveTime);

expiringWorkerQueue.offer(threadWorker);
}

/**
* 真正负责清理工作的代码
*/
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();

for (ThreadWorker threadWorker : expiringWorkerQueue) {
// 通过判断存活时间是否过期来决定是否移除缓存,被移除后最终会被垃圾回收,因为是个有序的队列所以只要找到第一个不过期的 worker 就可以中断循环了
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
break;
}
}
}
}

long now() {
return System.nanoTime();
}

/**
* 中断操作
*/
void shutdown() {
allWorkers.dispose();
if (evictorTask != null) {
evictorTask.cancel(true);
}
if (evictorService != null) {
evictorService.shutdownNow();
}
}
}

通过这个管理池,我们可以清晰地了解 Schedulers.io() 的运行特性,它的 worker 没有上限,但是会有一个 keepAliveTime 存活时间的限制,当 worker 执行完毕后会被存入到一个缓存队列中,而管理 worker 的对象池会以一个定时任务去清理过期的 worker,未过期的 worker 有机会得到重用。这就实现了一个可以自适应大小的缓存池。

那另外一个 Schedulers.computation() 也类似,只不过它里面负责管理 worker 的 FixedSchedulerPool 已经是固定大小的了(cpu 核心数),逻辑也更简单点,有兴趣的话可以去查看一些其他的调度器实现

Android 主线程调度器

AndroidSchedulers.mainThread() 是 rxandroid 提供的主线程切换调度器,也是我们最常用的调度器之一,理解了它也有助于我们对 Android 开发的线程间通信有更好的认识

AndroidSchedulers.mainThread() 追溯进去发现具体逻辑由 HandlerScheduler 实现

1
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

它会持有一个主线程 handler 的引用

HandlerWorker.schedule 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");

if (disposed) {
return Disposables.disposed();
}

run = RxJavaPlugins.onSchedule(run);

// 将 handler 和 run 封装成 dispose,同时作为 callback 传递给 message,是线程得以在主线程运行
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);
message.obj = this; // 作为 dispose 时 handler removeCallbacksAndMessages 的 token

handler.sendMessageDelayed(message, unit.toMillis(delay));

// 对状态进行校验,实现取消订阅功能
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}

return scheduled;
}

AndroidSchedulers.mainThread() 只是通过 Android 中的 handler 特性将线程切换到了主线程执行

结语

对于 Rxjava 的线程切换也有了较为全面的认知了,接下去要分析的网络三剑客中的 retrofit, retrofit 更是一个设计模式的集大成者。